by Andrew Trask
In [1]:
def pretty_print_review_and_label(i):
print(labels[i] + "\t:\t" + reviews[i][:80] + "...")
g = open('reviews.txt','r') # What we know!
reviews = list(map(lambda x:x[:-1],g.readlines()))
g.close()
g = open('labels.txt','r') # What we WANT to know!
labels = list(map(lambda x:x[:-1].upper(),g.readlines()))
g.close()
In [2]:
len(reviews)
Out[2]:
In [3]:
reviews[0]
Out[3]:
In [4]:
labels[0]
Out[4]:
In [5]:
print("labels.txt \t : \t reviews.txt\n")
pretty_print_review_and_label(2137)
pretty_print_review_and_label(12816)
pretty_print_review_and_label(6267)
pretty_print_review_and_label(21934)
pretty_print_review_and_label(5297)
pretty_print_review_and_label(4998)
In [6]:
from collections import Counter
import numpy as np
In [7]:
positive_counts = Counter()
negative_counts = Counter()
total_counts = Counter()
In [8]:
for i in range(len(reviews)):
if(labels[i] == 'POSITIVE'):
for word in reviews[i].split(" "):
positive_counts[word] += 1
total_counts[word] += 1
else:
for word in reviews[i].split(" "):
negative_counts[word] += 1
total_counts[word] += 1
In [9]:
positive_counts.most_common()[:15]
Out[9]:
In [10]:
pos_neg_ratios = Counter()
for term,cnt in list(total_counts.most_common()):
if(cnt > 100):
pos_neg_ratio = positive_counts[term] / float(negative_counts[term]+1)
pos_neg_ratios[term] = pos_neg_ratio
for word,ratio in pos_neg_ratios.most_common():
if(ratio > 1):
pos_neg_ratios[word] = np.log(ratio)
else:
pos_neg_ratios[word] = -np.log((1 / (ratio+0.01)))
In [11]:
# words most frequently seen in a review with a "POSITIVE" label
pos_neg_ratios.most_common()[:15]
Out[11]:
In [12]:
# words most frequently seen in a review with a "NEGATIVE" label
list(reversed(pos_neg_ratios.most_common()))[0:30]
Out[12]:
In [13]:
from IPython.display import Image
review = "This was a horrible, terrible movie."
Image(filename='sentiment_network.png')
Out[13]:
In [14]:
review = "The movie was excellent"
Image(filename='sentiment_network_pos.png')
Out[14]:
In [15]:
vocab = set(total_counts.keys())
vocab_size = len(vocab)
print(vocab_size)
In [16]:
list(vocab)[:15]
Out[16]:
In [17]:
import numpy as np
layer_0 = np.zeros((1,vocab_size))
layer_0
Out[17]:
In [18]:
from IPython.display import Image
Image(filename='sentiment_network.png')
Out[18]:
In [19]:
word2index = {}
for i,word in enumerate(vocab):
word2index[word] = i
word2index_sample = {k: word2index[k] for k in list(word2index.keys())[:15]}
word2index_sample
Out[19]:
In [20]:
def update_input_layer(review):
global layer_0
# clear out previous state, reset the layer to be all 0s
layer_0 *= 0
for word in review.split(" "):
layer_0[0][word2index[word]] += 1
update_input_layer(reviews[0])
In [21]:
layer_0
Out[21]:
In [22]:
def get_target_for_label(label):
if(label == 'POSITIVE'):
return 1
else:
return 0
In [23]:
labels[0]
Out[23]:
In [24]:
get_target_for_label(labels[0])
Out[24]:
In [25]:
labels[1]
Out[25]:
In [26]:
get_target_for_label(labels[1])
Out[26]:
In [27]:
import time
import sys
import numpy as np
# Let's tweak our network from before to model these phenomena
class SentimentNetwork:
def __init__(self, reviews,labels,hidden_nodes = 10, learning_rate = 0.1):
# set our random number generator
np.random.seed(1)
self.pre_process_data(reviews, labels)
self.init_network(len(self.review_vocab),hidden_nodes, 1, learning_rate)
def pre_process_data(self, reviews, labels):
review_vocab = set()
for review in reviews:
for word in review.split(" "):
review_vocab.add(word)
self.review_vocab = list(review_vocab)
label_vocab = set()
for label in labels:
label_vocab.add(label)
self.label_vocab = list(label_vocab)
self.review_vocab_size = len(self.review_vocab)
self.label_vocab_size = len(self.label_vocab)
self.word2index = {}
for i, word in enumerate(self.review_vocab):
self.word2index[word] = i
self.label2index = {}
for i, label in enumerate(self.label_vocab):
self.label2index[label] = i
def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
# Set number of nodes in input, hidden and output layers.
self.input_nodes = input_nodes
self.hidden_nodes = hidden_nodes
self.output_nodes = output_nodes
# Initialize weights
self.weights_0_1 = np.zeros((self.input_nodes,self.hidden_nodes))
self.weights_1_2 = np.random.normal(0.0, self.output_nodes**-0.5,
(self.hidden_nodes, self.output_nodes))
self.learning_rate = learning_rate
self.layer_0 = np.zeros((1,input_nodes))
def update_input_layer(self,review):
# clear out previous state, reset the layer to be all 0s
self.layer_0 *= 0
for word in review.split(" "):
if(word in self.word2index.keys()):
self.layer_0[0][self.word2index[word]] += 1
def get_target_for_label(self,label):
if(label == 'POSITIVE'):
return 1
else:
return 0
def sigmoid(self,x):
return 1 / (1 + np.exp(-x))
def sigmoid_output_2_derivative(self,output):
return output * (1 - output)
def train(self, training_reviews, training_labels):
assert(len(training_reviews) == len(training_labels))
correct_so_far = 0
start = time.time()
for i in range(len(training_reviews)):
review = training_reviews[i]
label = training_labels[i]
#### Implement the forward pass here ####
### Forward pass ###
# Input Layer
self.update_input_layer(review)
# Hidden layer
layer_1 = self.layer_0.dot(self.weights_0_1)
# Output layer
layer_2 = self.sigmoid(layer_1.dot(self.weights_1_2))
#### Implement the backward pass here ####
### Backward pass ###
# TODO: Output error
layer_2_error = layer_2 - self.get_target_for_label(label) # Output layer error is the difference between desired target and actual output.
layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)
# TODO: Backpropagated error
layer_1_error = layer_2_delta.dot(self.weights_1_2.T) # errors propagated to the hidden layer
layer_1_delta = layer_1_error # hidden layer gradients - no nonlinearity so it's the same as the error
# TODO: Update the weights
self.weights_1_2 -= layer_1.T.dot(layer_2_delta) * self.learning_rate # update hidden-to-output weights with gradient descent step
self.weights_0_1 -= self.layer_0.T.dot(layer_1_delta) * self.learning_rate # update input-to-hidden weights with gradient descent step
if(np.abs(layer_2_error) < 0.5):
correct_so_far += 1
reviews_per_second = i / float(time.time() - start)
sys.stdout.write("\rProgress:" + str(100 * i/float(len(training_reviews)))[:4] + "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] + " #Correct:" + str(correct_so_far) + " #Trained:" + str(i+1) + " Training Accuracy:" + str(correct_so_far * 100 / float(i+1))[:4] + "%")
if(i % 2500 == 0):
print("")
def test(self, testing_reviews, testing_labels):
correct = 0
start = time.time()
for i in range(len(testing_reviews)):
pred = self.run(testing_reviews[i])
if(pred == testing_labels[i]):
correct += 1
reviews_per_second = i / float(time.time() - start)
sys.stdout.write("\rProgress:" + str(100 * i/float(len(testing_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ "% #Correct:" + str(correct) + " #Tested:" + str(i+1) + " Testing Accuracy:" + str(correct * 100 / float(i+1))[:4] + "%")
def run(self, review):
# Input Layer
self.update_input_layer(review.lower())
# Hidden layer
layer_1 = self.layer_0.dot(self.weights_0_1)
# Output layer
layer_2 = self.sigmoid(layer_1.dot(self.weights_1_2))
if(layer_2[0] > 0.5):
return "POSITIVE"
else:
return "NEGATIVE"
In [29]:
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000], learning_rate=0.1)
In [61]:
# evaluate our model before training (just to show how horrible it is)
mlp.test(reviews[-1000:],labels[-1000:])
In [62]:
# train the network
mlp.train(reviews[:-1000],labels[:-1000])
In [63]:
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000], learning_rate=0.01)
In [64]:
# train the network
mlp.train(reviews[:-1000],labels[:-1000])
In [65]:
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000], learning_rate=0.001)
In [66]:
# train the network
mlp.train(reviews[:-1000],labels[:-1000])
In [28]:
from IPython.display import Image
Image(filename='sentiment_network.png')
Out[28]:
In [29]:
def update_input_layer(review):
global layer_0
# clear out previous state, reset the layer to be all 0s
layer_0 *= 0
for word in review.split(" "):
layer_0[0][word2index[word]] += 1
update_input_layer(reviews[0])
In [30]:
layer_0
Out[30]:
In [31]:
review_counter = Counter()
In [32]:
for word in reviews[0].split(" "):
review_counter[word] += 1
In [33]:
review_counter.most_common()[:15]
Out[33]:
In [34]:
import time
import sys
import numpy as np
# Let's tweak our network from before to model these phenomena
class SentimentNetwork:
def __init__(self, reviews,labels,hidden_nodes = 10, learning_rate = 0.1):
# set our random number generator
np.random.seed(1)
self.pre_process_data(reviews, labels)
self.init_network(len(self.review_vocab),hidden_nodes, 1, learning_rate)
def pre_process_data(self, reviews, labels):
review_vocab = set()
for review in reviews:
for word in review.split(" "):
review_vocab.add(word)
self.review_vocab = list(review_vocab)
label_vocab = set()
for label in labels:
label_vocab.add(label)
self.label_vocab = list(label_vocab)
self.review_vocab_size = len(self.review_vocab)
self.label_vocab_size = len(self.label_vocab)
self.word2index = {}
for i, word in enumerate(self.review_vocab):
self.word2index[word] = i
self.label2index = {}
for i, label in enumerate(self.label_vocab):
self.label2index[label] = i
def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
# Set number of nodes in input, hidden and output layers.
self.input_nodes = input_nodes
self.hidden_nodes = hidden_nodes
self.output_nodes = output_nodes
# Initialize weights
self.weights_0_1 = np.zeros((self.input_nodes,self.hidden_nodes))
self.weights_1_2 = np.random.normal(0.0, self.output_nodes**-0.5,
(self.hidden_nodes, self.output_nodes))
self.learning_rate = learning_rate
self.layer_0 = np.zeros((1,input_nodes))
def update_input_layer(self,review):
# clear out previous state, reset the layer to be all 0s
self.layer_0 *= 0
for word in review.split(" "):
if(word in self.word2index.keys()):
self.layer_0[0][self.word2index[word]] = 1
def get_target_for_label(self,label):
if(label == 'POSITIVE'):
return 1
else:
return 0
def sigmoid(self,x):
return 1 / (1 + np.exp(-x))
def sigmoid_output_2_derivative(self,output):
return output * (1 - output)
def train(self, training_reviews, training_labels):
assert(len(training_reviews) == len(training_labels))
correct_so_far = 0
start = time.time()
for i in range(len(training_reviews)):
review = training_reviews[i]
label = training_labels[i]
#### Implement the forward pass here ####
### Forward pass ###
# Input Layer
self.update_input_layer(review)
# Hidden layer
layer_1 = self.layer_0.dot(self.weights_0_1)
# Output layer
layer_2 = self.sigmoid(layer_1.dot(self.weights_1_2))
#### Implement the backward pass here ####
### Backward pass ###
# TODO: Output error
layer_2_error = layer_2 - self.get_target_for_label(label) # Output layer error is the difference between desired target and actual output.
layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)
# TODO: Backpropagated error
layer_1_error = layer_2_delta.dot(self.weights_1_2.T) # errors propagated to the hidden layer
layer_1_delta = layer_1_error # hidden layer gradients - no nonlinearity so it's the same as the error
# TODO: Update the weights
self.weights_1_2 -= layer_1.T.dot(layer_2_delta) * self.learning_rate # update hidden-to-output weights with gradient descent step
self.weights_0_1 -= self.layer_0.T.dot(layer_1_delta) * self.learning_rate # update input-to-hidden weights with gradient descent step
if(np.abs(layer_2_error) < 0.5):
correct_so_far += 1
reviews_per_second = i / float(time.time() - start)
sys.stdout.write("\rProgress:" + str(100 * i/float(len(training_reviews)))[:4] + "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] + " #Correct:" + str(correct_so_far) + " #Trained:" + str(i+1) + " Training Accuracy:" + str(correct_so_far * 100 / float(i+1))[:4] + "%")
if(i % 2500 == 0):
print("")
def test(self, testing_reviews, testing_labels):
correct = 0
start = time.time()
for i in range(len(testing_reviews)):
pred = self.run(testing_reviews[i])
if(pred == testing_labels[i]):
correct += 1
reviews_per_second = i / float(time.time() - start)
sys.stdout.write("\rProgress:" + str(100 * i/float(len(testing_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ "% #Correct:" + str(correct) + " #Tested:" + str(i+1) + " Testing Accuracy:" + str(correct * 100 / float(i+1))[:4] + "%")
def run(self, review):
# Input Layer
self.update_input_layer(review.lower())
# Hidden layer
layer_1 = self.layer_0.dot(self.weights_0_1)
# Output layer
layer_2 = self.sigmoid(layer_1.dot(self.weights_1_2))
if(layer_2[0] > 0.5):
return "POSITIVE"
else:
return "NEGATIVE"
In [37]:
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000], learning_rate=0.1)
In [84]:
mlp.train(reviews[:-1000],labels[:-1000])
In [85]:
# evaluate our model before training (just to show how horrible it is)
mlp.test(reviews[-1000:],labels[-1000:])
In [35]:
Image(filename='sentiment_network_sparse.png')
Out[35]:
In [36]:
layer_0 = np.zeros(10)
In [37]:
layer_0
Out[37]:
In [38]:
layer_0[4] = 1
layer_0[9] = 1
In [39]:
layer_0
Out[39]:
In [40]:
weights_0_1 = np.random.randn(10,5)
In [41]:
layer_0.dot(weights_0_1)
Out[41]:
In [42]:
indices = [4,9]
In [43]:
layer_1 = np.zeros(5)
In [44]:
for index in indices:
layer_1 += (weights_0_1[index])
In [45]:
layer_1
Out[45]:
In [46]:
Image(filename='sentiment_network_sparse_2.png')
Out[46]:
In [47]:
import time
import sys
# Let's tweak our network from before to model these phenomena
class SentimentNetwork:
def __init__(self, reviews,labels,hidden_nodes = 10, learning_rate = 0.1):
np.random.seed(1)
self.pre_process_data(reviews)
self.init_network(len(self.review_vocab),hidden_nodes, 1, learning_rate)
def pre_process_data(self,reviews):
review_vocab = set()
for review in reviews:
for word in review.split(" "):
review_vocab.add(word)
self.review_vocab = list(review_vocab)
label_vocab = set()
for label in labels:
label_vocab.add(label)
self.label_vocab = list(label_vocab)
self.review_vocab_size = len(self.review_vocab)
self.label_vocab_size = len(self.label_vocab)
self.word2index = {}
for i, word in enumerate(self.review_vocab):
self.word2index[word] = i
self.label2index = {}
for i, label in enumerate(self.label_vocab):
self.label2index[label] = i
def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
# Set number of nodes in input, hidden and output layers.
self.input_nodes = input_nodes
self.hidden_nodes = hidden_nodes
self.output_nodes = output_nodes
# Initialize weights
self.weights_0_1 = np.zeros((self.input_nodes,self.hidden_nodes))
self.weights_1_2 = np.random.normal(0.0, self.output_nodes**-0.5,
(self.hidden_nodes, self.output_nodes))
self.learning_rate = learning_rate
self.layer_0 = np.zeros((1,input_nodes))
self.layer_1 = np.zeros((1,hidden_nodes))
def sigmoid(self,x):
return 1 / (1 + np.exp(-x))
def sigmoid_output_2_derivative(self,output):
return output * (1 - output)
def update_input_layer(self,review):
# clear out previous state, reset the layer to be all 0s
self.layer_0 *= 0
for word in review.split(" "):
self.layer_0[0][self.word2index[word]] = 1
def get_target_for_label(self,label):
if(label == 'POSITIVE'):
return 1
else:
return 0
def train(self, training_reviews_raw, training_labels):
training_reviews = list()
for review in training_reviews_raw:
indices = set()
for word in review.split(" "):
if(word in self.word2index.keys()):
indices.add(self.word2index[word])
training_reviews.append(list(indices))
assert(len(training_reviews) == len(training_labels))
correct_so_far = 0
start = time.time()
for i in range(len(training_reviews)):
review = training_reviews[i]
label = training_labels[i]
#### Implement the forward pass here ####
### Forward pass ###
# Input Layer
# Hidden layer
# layer_1 = self.layer_0.dot(self.weights_0_1)
self.layer_1 *= 0
for index in review:
self.layer_1 += self.weights_0_1[index]
# Output layer
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
#### Implement the backward pass here ####
### Backward pass ###
# Output error
layer_2_error = layer_2 - self.get_target_for_label(label) # Output layer error is the difference between desired target and actual output.
layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)
# Backpropagated error
layer_1_error = layer_2_delta.dot(self.weights_1_2.T) # errors propagated to the hidden layer
layer_1_delta = layer_1_error # hidden layer gradients - no nonlinearity so it's the same as the error
# Update the weights
self.weights_1_2 -= self.layer_1.T.dot(layer_2_delta) * self.learning_rate # update hidden-to-output weights with gradient descent step
for index in review:
self.weights_0_1[index] -= layer_1_delta[0] * self.learning_rate # update input-to-hidden weights with gradient descent step
if(np.abs(layer_2_error) < 0.5):
correct_so_far += 1
reviews_per_second = i / float(time.time() - start)
sys.stdout.write("\rProgress:" + str(100 * i/float(len(training_reviews)))[:4] + "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] + " #Correct:" + str(correct_so_far) + " #Trained:" + str(i+1) + " Training Accuracy:" + str(correct_so_far * 100 / float(i+1))[:4] + "%")
def test(self, testing_reviews, testing_labels):
correct = 0
start = time.time()
for i in range(len(testing_reviews)):
pred = self.run(testing_reviews[i])
if(pred == testing_labels[i]):
correct += 1
reviews_per_second = i / float(time.time() - start)
sys.stdout.write("\rProgress:" + str(100 * i/float(len(testing_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ "% #Correct:" + str(correct) + " #Tested:" + str(i+1) + " Testing Accuracy:" + str(correct * 100 / float(i+1))[:4] + "%")
def run(self, review):
# Input Layer
# Hidden layer
self.layer_1 *= 0
unique_indices = set()
for word in review.lower().split(" "):
if word in self.word2index.keys():
unique_indices.add(self.word2index[word])
for index in unique_indices:
self.layer_1 += self.weights_0_1[index]
# Output layer
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
if(layer_2[0] > 0.5):
return "POSITIVE"
else:
return "NEGATIVE"
In [51]:
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000], learning_rate=0.1)
In [111]:
mlp.train(reviews[:-1000],labels[:-1000])
In [109]:
# evaluate our model before training (just to show how horrible it is)
mlp.test(reviews[-1000:],labels[-1000:])
In [48]:
Image(filename='sentiment_network_sparse_2.png')
Out[48]:
In [49]:
# words most frequently seen in a review with a "POSITIVE" label
pos_neg_ratios.most_common()[:15]
Out[49]:
In [50]:
# words most frequently seen in a review with a "NEGATIVE" label
list(reversed(pos_neg_ratios.most_common()))[0:30]
Out[50]:
In [51]:
from bokeh.models import ColumnDataSource, LabelSet
from bokeh.plotting import figure, show, output_file
from bokeh.io import output_notebook
output_notebook()
In [52]:
hist, edges = np.histogram(list(map(lambda x:x[1],pos_neg_ratios.most_common())), density=True, bins=100, normed=True)
p = figure(tools="pan,wheel_zoom,reset,save",
toolbar_location="above",
title="Word Positive/Negative Affinity Distribution")
p.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], line_color="#555555")
show(p)
In [53]:
frequency_frequency = Counter()
for word, cnt in total_counts.most_common():
frequency_frequency[cnt] += 1
In [54]:
hist, edges = np.histogram(list(map(lambda x:x[1],frequency_frequency.most_common())), density=True, bins=100, normed=True)
p = figure(tools="pan,wheel_zoom,reset,save",
toolbar_location="above",
title="The frequency distribution of the words in our corpus")
p.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], line_color="#555555")
show(p)
In [55]:
import time
import sys
import numpy as np
# Let's tweak our network from before to model these phenomena
class SentimentNetwork:
def __init__(self, reviews,labels,min_count = 10,polarity_cutoff = 0.1,hidden_nodes = 10, learning_rate = 0.1):
np.random.seed(1)
self.pre_process_data(reviews, polarity_cutoff, min_count)
self.init_network(len(self.review_vocab),hidden_nodes, 1, learning_rate)
def pre_process_data(self,reviews, polarity_cutoff,min_count):
positive_counts = Counter()
negative_counts = Counter()
total_counts = Counter()
for i in range(len(reviews)):
if(labels[i] == 'POSITIVE'):
for word in reviews[i].split(" "):
positive_counts[word] += 1
total_counts[word] += 1
else:
for word in reviews[i].split(" "):
negative_counts[word] += 1
total_counts[word] += 1
pos_neg_ratios = Counter()
for term,cnt in list(total_counts.most_common()):
if(cnt >= 50):
pos_neg_ratio = positive_counts[term] / float(negative_counts[term]+1)
pos_neg_ratios[term] = pos_neg_ratio
for word,ratio in pos_neg_ratios.most_common():
if(ratio > 1):
pos_neg_ratios[word] = np.log(ratio)
else:
pos_neg_ratios[word] = -np.log((1 / (ratio + 0.01)))
review_vocab = set()
for review in reviews:
for word in review.split(" "):
if(total_counts[word] > min_count):
if(word in pos_neg_ratios.keys()):
if((pos_neg_ratios[word] >= polarity_cutoff) or (pos_neg_ratios[word] <= -polarity_cutoff)):
review_vocab.add(word)
else:
review_vocab.add(word)
self.review_vocab = list(review_vocab)
label_vocab = set()
for label in labels:
label_vocab.add(label)
self.label_vocab = list(label_vocab)
self.review_vocab_size = len(self.review_vocab)
self.label_vocab_size = len(self.label_vocab)
self.word2index = {}
for i, word in enumerate(self.review_vocab):
self.word2index[word] = i
self.label2index = {}
for i, label in enumerate(self.label_vocab):
self.label2index[label] = i
def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
# Set number of nodes in input, hidden and output layers.
self.input_nodes = input_nodes
self.hidden_nodes = hidden_nodes
self.output_nodes = output_nodes
# Initialize weights
self.weights_0_1 = np.zeros((self.input_nodes,self.hidden_nodes))
self.weights_1_2 = np.random.normal(0.0, self.output_nodes**-0.5,
(self.hidden_nodes, self.output_nodes))
self.learning_rate = learning_rate
self.layer_0 = np.zeros((1,input_nodes))
self.layer_1 = np.zeros((1,hidden_nodes))
def sigmoid(self,x):
return 1 / (1 + np.exp(-x))
def sigmoid_output_2_derivative(self,output):
return output * (1 - output)
def update_input_layer(self,review):
# clear out previous state, reset the layer to be all 0s
self.layer_0 *= 0
for word in review.split(" "):
self.layer_0[0][self.word2index[word]] = 1
def get_target_for_label(self,label):
if(label == 'POSITIVE'):
return 1
else:
return 0
def train(self, training_reviews_raw, training_labels):
training_reviews = list()
for review in training_reviews_raw:
indices = set()
for word in review.split(" "):
if(word in self.word2index.keys()):
indices.add(self.word2index[word])
training_reviews.append(list(indices))
assert(len(training_reviews) == len(training_labels))
correct_so_far = 0
start = time.time()
for i in range(len(training_reviews)):
review = training_reviews[i]
label = training_labels[i]
#### Implement the forward pass here ####
### Forward pass ###
# Input Layer
# Hidden layer
# layer_1 = self.layer_0.dot(self.weights_0_1)
self.layer_1 *= 0
for index in review:
self.layer_1 += self.weights_0_1[index]
# Output layer
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
#### Implement the backward pass here ####
### Backward pass ###
# Output error
layer_2_error = layer_2 - self.get_target_for_label(label) # Output layer error is the difference between desired target and actual output.
layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)
# Backpropagated error
layer_1_error = layer_2_delta.dot(self.weights_1_2.T) # errors propagated to the hidden layer
layer_1_delta = layer_1_error # hidden layer gradients - no nonlinearity so it's the same as the error
# Update the weights
self.weights_1_2 -= self.layer_1.T.dot(layer_2_delta) * self.learning_rate # update hidden-to-output weights with gradient descent step
for index in review:
self.weights_0_1[index] -= layer_1_delta[0] * self.learning_rate # update input-to-hidden weights with gradient descent step
if(layer_2 >= 0.5 and label == 'POSITIVE'):
correct_so_far += 1
if(layer_2 < 0.5 and label == 'NEGATIVE'):
correct_so_far += 1
reviews_per_second = i / float(time.time() - start)
sys.stdout.write("\rProgress:" + str(100 * i/float(len(training_reviews)))[:4] + "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] + " #Correct:" + str(correct_so_far) + " #Trained:" + str(i+1) + " Training Accuracy:" + str(correct_so_far * 100 / float(i+1))[:4] + "%")
def test(self, testing_reviews, testing_labels):
correct = 0
start = time.time()
for i in range(len(testing_reviews)):
pred = self.run(testing_reviews[i])
if(pred == testing_labels[i]):
correct += 1
reviews_per_second = i / float(time.time() - start)
sys.stdout.write("\rProgress:" + str(100 * i/float(len(testing_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ "% #Correct:" + str(correct) + " #Tested:" + str(i+1) + " Testing Accuracy:" + str(correct * 100 / float(i+1))[:4] + "%")
def run(self, review):
# Input Layer
# Hidden layer
self.layer_1 *= 0
unique_indices = set()
for word in review.lower().split(" "):
if word in self.word2index.keys():
unique_indices.add(self.word2index[word])
for index in unique_indices:
self.layer_1 += self.weights_0_1[index]
# Output layer
layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
if(layer_2[0] >= 0.5):
return "POSITIVE"
else:
return "NEGATIVE"
In [69]:
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000],min_count=20,polarity_cutoff=0.05,learning_rate=0.01)
In [70]:
mlp.train(reviews[:-1000],labels[:-1000])
In [71]:
mlp.test(reviews[-1000:],labels[-1000:])
In [75]:
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000],min_count=20,polarity_cutoff=0.7,learning_rate=0.01)
In [76]:
mlp.train(reviews[:-1000],labels[:-1000])
In [77]:
mlp.test(reviews[-1000:],labels[-1000:])
In [ ]: